Compression & Entropy

In [1]:
# Load Python libraries
import numpy as np
import math
from collections import Counter
from scipy.stats import entropy
In [2]:
# Load Plot libraries
import seaborn as sns
import matplotlib.pyplot as plt

Huffman Code class

In [3]:
# Class HuffmanCode from scratch
class HuffmanCode:
    
    # Return a Huffman code for an ensemble with distribution p
    def get_code(self, p_symbols):
        
        # Init validation
        n = len(p_symbols)
        if n == 0:
            return dict()
        elif n == 1:
            return dict(zip(p_symbols.keys(), ['1']))
        
        # Ensure probabilities sum to 1
        self._normalize_weights(p_symbols)
        
        # Returns Huffman code
        return self._get_code(p_symbols);
    
    # (Private) Calculate Huffman code
    def _get_code(self, p):
        
        # Base case of only two symbols, assign 0 or 1 arbitrarily
        if len(p) == 2:
            return dict(zip(p.keys(), ['0', '1']))
        
        # Create a new distribution by merging lowest prob pair
        p_prime = p.copy()
        s1, s2 = self._get_lowest_prob_pair(p)
        p1, p2 = p_prime.pop(s1), p_prime.pop(s2)
        p_prime[s1 + s2] = p1 + p2
        
        # Recurse and construct code on new distribution
        code = self._get_code(p_prime)
        symbol = s1 + s2
        s1s2 = code.pop(symbol)
        code[s1], code[s2] = s1s2 + '0', s1s2 + '1'
        
        return code;
    
    # Return pair of symbols from distribution p with lowest probabilities
    def _get_lowest_prob_pair(self, p):
        
        # Ensure there are at least 2 symbols in the dist.
        if len(p) >= 2:
            sorted_p = sorted(p.items(), key=lambda x: x[1])
            return sorted_p[0][0], sorted_p[1][0];
        
        return (None, None);
    
    # Makes sure all weights add up to 1
    def _normalize_weights(self, p_symbols, t_weight=1.0):
        n = sum(p_symbols.values())
        
        if n != t_weight:
            for s in p_symbols:
                p_symbols[s] = p_symbols[s] / n;
In [4]:
# Create Huffman Code instance
hc = HuffmanCode()

1. Compression with current Entropy

In [5]:
# Function - Read file in low level (Bytes)
def get_file_bytes(file_path):
    with open(file_path, 'rb') as f:
        return bytearray(f.read());
    return None;
In [6]:
# Function - Return shannon entropy
def entropy_shannon(labels, base=None):
    value, counts = np.unique(labels, return_counts=True)
    return entropy(counts, base=base)
In [7]:
# Loading target file
file_path = "../data/text/book-1.txt"
text_byte_list = get_file_bytes(file_path)
print(len(text_byte_list), 'Bytes')
1257099 Bytes

Theoretical compression percentage

In [8]:
# Function - Calculates the compression percentage (%)
def calc_compression_percentage(curr_size, new_size):
    return round((curr_size - new_size) / curr_size * 100, 2);
In [9]:
# Calculate entropy of file
curr_entropy = entropy_shannon(text_byte_list, 2)
curr_entropy
Out[9]:
4.59266006221856
In [10]:
# Real compression percentage (%)
curr_size = 8
new_size = curr_entropy
compress_rate = calc_compression_percentage(curr_size, new_size)
print(compress_rate, '%')
42.59 %

Real compression percentage

In [11]:
# Function - Calculate code frequency
def get_term_freq(term_list):
    term_freq = {}
    terms_count = dict(Counter(term_list))
    
    for key, value in terms_count.items():
        if isinstance(key, int):
            key = chr(key)
        term_freq[key] = value
    
    return term_freq;

# Function - Build the compress file
def create_compress_file(byte_list, code_list):
    compress_list = []
    
    for symbol in byte_list:
        key = chr(symbol)
        new_symbol = code_list[key]
        compress_list.append(new_symbol)
    
    # Return compress file
    return "".join(compress_list)

# Function - Compressing file
def get_compress_file(byte_list):
    
    # Get symbols frequency
    term_freq = get_term_freq(byte_list)
    
    # Normalize term frequency
    n = sum(term_freq.values())
    for term in term_freq:
        term_freq[term] = term_freq[term] / n;
    
    # Get Huffman coding
    h_code = hc.get_code(term_freq)
    
    # Compressing file with Huffman code
    compress_file = create_compress_file(byte_list, h_code)
            
    return compress_file, h_code;
In [12]:
# Compressing initial text file
compress_file, h_code = get_compress_file(text_byte_list)
In [13]:
# Real compression percentage (%)
curr_size = len(text_byte_list)
new_size = len(compress_file) / 8
compress_rate = calc_compression_percentage(curr_size, new_size)
print(compress_rate, '%')
42.19 %

2. Changing Entropy for higher Compression

Below, the functions that allow you to find the best entropy configuration for a binary file.

In [14]:
# Function - Find the low entropy setup for target file
def find_low_entropy(byte_list, neighborhood, verbose):
    B = 256
    best_entropy = math.log(B, 2)
    best_byte_list = []
    best_key = -1
    n = len(byte_list)
    
    if verbose:
        print(n, 'Bytes')
    
    for curr_key in range(B):
        curr_byte_list = []
        
        for i in range(n):
            curr_value = curr_key ^ byte_list[i]
            
            for j in range(i - neighborhood, i + 1):
                if i != j and j >= 0:
                    curr_value = curr_value ^ curr_byte_list[j]
            
            curr_byte_list.append(curr_value)
            
        # Get current entropy
        curr_entropy = entropy_shannon(curr_byte_list.copy(), 2)
        
        # Update best option
        if curr_entropy < best_entropy:
            if verbose:
                print('key:', curr_key, ', last entropy:', best_entropy, ', new entropy:', curr_entropy)
                
            best_entropy = curr_entropy
            best_key = curr_key
            best_byte_list = curr_byte_list
    
    return { 'byte_list': best_byte_list, 'entropy': best_entropy, 'key': best_key};
In [15]:
# Find file with low entropy
best_option = find_low_entropy(text_byte_list, 0, True)
1257099 Bytes
key: 0 , last entropy: 8.0 , new entropy: 4.59266006221856
key: 33 , last entropy: 4.59266006221856 , new entropy: 4.592660062218559
In [16]:
# Show best entropy setup
best_option['entropy'], best_option['key']
Out[16]:
(4.592660062218559, 33)
In [17]:
# Save best byte list
best_byte_list = best_option['byte_list']
In [18]:
# Function - Create byte matrix
def create_byte_matrix(byte_list, row_len, col_len):
    matrix = np.zeros((row_len, col_len))
    data = np.array(byte_list)
    
    # Save bytes into matrix
    for i in range(0, len(data)):
        ix_row = int(i / col_len)
        ix_col = i % col_len
        matrix[ix_row][ix_col] = data[i]
    
    return matrix;
In [19]:
# Function - Plot image in binary file
def plot_byte_matrix(matrix, plt_title):
    fig, ax = plt.subplots(figsize = (14, 14))
    sns.heatmap(matrix, ax = ax)
    ax.set_title(plt_title, fontsize = 16)
    ax.set_xlabel('columns', fontsize = 12)
    ax.set_ylabel('rows', fontsize = 12)
    plt.show()
In [20]:
# Create and plot original matrix
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(text_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
In [21]:
# Create and plot the new matrix (with low entropy)
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(best_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of New File')
In [22]:
# Save the new file as txt
temp_file_path = file_path.replace('.txt', '-new.txt')
with open(temp_file_path, 'w+b') as f:
    binary_format = bytearray(best_byte_list)
    f.write(binary_format)

3. Restoring Entropy to Decompression

Below, the functions that allow you to reverse the entropy change.

In [23]:
# Restoring entropy process
new_byte_list = []
neighborhood = 0
n = len(best_byte_list)

for i in range(n-1, 0, -1):
    curr_value = best_option['key'] ^ best_byte_list[i]
    
    for j in range(i - neighborhood, i + 1):
        if i != j and j >= 0:
            curr_value = curr_value ^ new_byte_list[j]
    new_byte_list.append(curr_value)
    
new_byte_list.append(best_option['key'] ^ best_byte_list[0])
new_byte_list = list(reversed(new_byte_list))

Validation

In [24]:
# Comparing size of byte lists
len(new_byte_list) == len(text_byte_list)
Out[24]:
True
In [25]:
# Comparing values of byte lists
sum(new_byte_list) - sum(text_byte_list)
Out[25]:
0
In [26]:
# Comparing entropies of byte lists
entropy_shannon(text_byte_list, 2) == entropy_shannon(new_byte_list, 2)
Out[26]:
True

Plotting

In [27]:
# Create and plot the new matrix (with low entropy)
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(new_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
In [28]:
# Save the original file as txt
temp_file_path = file_path.replace('.txt', '-original.txt')
with open(temp_file_path, 'w+b') as f:
    binary_format = bytearray(new_byte_list)
    f.write(binary_format)